/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;

import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationUtils;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;

import java.util.Map;
import java.util.Optional;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.ext.auth.User;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class WebSocketRequestHandler {

  private static final Logger LOG = LogManager.getLogger();

  private final Vertx vertx;
  private final Map<String, JsonRpcMethod> methods;
  final EthScheduler ethScheduler;
  private final long timeoutSec;

  public WebSocketRequestHandler(
      final Vertx vertx,
      final Map<String, JsonRpcMethod> methods,
      final EthScheduler ethScheduler,
      final long timeoutSec) {
    this.vertx = vertx;
    this.methods = methods;
    this.ethScheduler = ethScheduler;
    this.timeoutSec = timeoutSec;
  }

  public void handle(final String id, final String payload) {
    handle(Optional.empty(), id, payload, Optional.empty());
  }

  public void handle(
      final Optional<AuthenticationService> authenticationService,
      final String id,
      final String payload,
      final Optional<User> user) {
    vertx.executeBlocking(
        executeHandler(authenticationService, id, payload, user), false, resultHandler(id));
  }

  private Handler<Promise<Object>> executeHandler(
      final Optional<AuthenticationService> authenticationService,
      final String id,
      final String payload,
      final Optional<User> user) {
    return future -> {
      final WebSocketRpcRequest request;
      try {
        request = Json.decodeValue(payload, WebSocketRpcRequest.class);
      } catch (final IllegalArgumentException | DecodeException e) {
        LOG.debug("Error mapping json to WebSocketRpcRequest", e);
        future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
        return;
      }

      if (!methods.containsKey(request.getMethod())) {
        future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.METHOD_NOT_FOUND));
        LOG.debug("Can't find method {}", request.getMethod());
        return;
      }
      final JsonRpcMethod method = methods.get(request.getMethod());
      try {
        LOG.debug("WS-RPC request -> {}", request.getMethod());
        request.setConnectionId(id);
        if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
          final JsonRpcRequestContext requestContext =
              new JsonRpcRequestContext(
                  request, user, new IsAliveHandler(ethScheduler, timeoutSec));
          future.complete(method.response(requestContext));
        } else {
          future.complete(
              new JsonRpcUnauthorizedResponse(request.getId(), JsonRpcError.UNAUTHORIZED));
        }
      } catch (final InvalidJsonRpcParameters e) {
        LOG.debug("Invalid Params", e);
        future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INVALID_PARAMS));
      } catch (final RpcMethodTimeoutException e) {
        LOG.error(JsonRpcError.TIMEOUT_ERROR.getMessage(), e);
        future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.TIMEOUT_ERROR));
      } catch (final Exception e) {
        LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
        future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INTERNAL_ERROR));
      }
    };
  }

  private Handler<AsyncResult<Object>> resultHandler(final String id) {
    return result -> {
      if (result.succeeded()) {
        replyToClient(id, Json.encodeToBuffer(result.result()));
      } else {
        replyToClient(
            id, Json.encodeToBuffer(new JsonRpcErrorResponse(null, JsonRpcError.INTERNAL_ERROR)));
      }
    };
  }

  private void replyToClient(final String id, final Buffer request) {
    vertx.eventBus().send(id, request.toString());
  }
}
